package com.study.banyiyi.contorller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName KafkaProducerController
 * @Description 生产者控制层
 * @Author yangwm
 * @Date 2021/7/27 18:14
 * @Version 1.0
 */
@RestController
@RequestMapping("/kafka/producer")
public class KafkaProducerController {

    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;

    @GetMapping("/send/{msg}")
    private void sendMessageByKafkaProducer(@PathVariable("msg") String msg){
        kafkaTemplate.send("topic1",msg);
    }

    /**
     * 添加回调监听
     * @param msg
     */
    @GetMapping("/send/callback/{msg}")
    private void sendMessageByKafkaProducerAndCallback(@PathVariable("msg") String msg){
        kafkaTemplate.send("topic1",msg).addCallback(sucess->{
            //消息成功发送到topic
            String topic = sucess.getRecordMetadata().topic();
            //消息发送到的分区
            int partition = sucess.getRecordMetadata().partition();
            //消息所在分区内地offset
            long offset = sucess.getRecordMetadata().offset();
            System.out.println("消息成功发送到：" + topic + partition + offset);
        },failure->{
            System.out.println("消息发送失败：" + failure.getMessage());
        });
    }
}
